package com.wcq.dessert.core;

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.FiberExecutorScheduler;
import co.paralleluniverse.fibers.SuspendExecution;
import com.wcq.dessert.demo.FriendServiceProxy;
import com.wcq.dessert.support.Log;

import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Port{
    /** 所在进程 */
    Node node;
    /** 线程名字 */
    private String name;

    /** 消息队列，接收其他线程发送的Call请求 */
    private Queue<Call> calls= new ConcurrentLinkedDeque<>();
    /** 本此心跳要处理的Call请求 */
    private List<Call> pulseCalls= new ArrayList<>();

    /** 消息队列，接收其他线程发送的CallResult请求 */
    private Queue<Call> callResults= new ConcurrentLinkedDeque<>();
    /** 本此心跳要处理的CallResult请求 */
    private List<Call> pulseResultCalls= new ArrayList<>();

    /** 管理的service */
    private Map<String, Service> services = new HashMap<>();
    /** 单线程调度器 */
    FiberExecutorScheduler fiberScheduler;

    /** 主协程，处理tick */
    PulseFiber pulseFiber;
    /** 协程池，回收 */
    public FiberPool pool;

    /** 当前线程 */
    private final static InheritableThreadLocal<Port> local = new InheritableThreadLocal<>();

    /** contextId生成 */
    public long contextIdCreater;
    /** 当前正在执行的Context */
    public long contextIdCur;
    /** 里面包含当前正在执行的context和异步等待回调的的context */
    public Map<Long, CallContext> contexts = new HashMap<>();

    public Port(String name){
        this.name = name;
        // 单线程线程池
        ExecutorService ses = Executors.newSingleThreadExecutor();
        this.fiberScheduler = new FiberExecutorScheduler(this.name, ses);
    }

    public void startUp(Node node){
        this.node = node;
        node.addPort(this);

        fiberScheduler.execute(() -> {
            Log.console("start");
            // fiberScheduler的启动主协程
            pulseFiber = new PulseFiber(this);
        });
    }

    /**
     * run in：pulsePort
     * 初始化
     */
    public void init() {
        local.set(this);
        pool = new FiberPool(this);
    }

    /**
     * run in：pulsePort
     * tick心跳
     */
    public void pulse() throws SuspendExecution, InterruptedException {
        while (true) {
            pulseAffirm();

            pulseCall();
            pulseCallResults();

            pulseService();

            // 其他需要线程启动的操作

//            Log.console("{}", pool.size());

            Fiber.sleep(20);

        }
    }

    private void pulseService() {
        for (Service service : services.values()) {
            service.pulse();
        }
    }


    private void pulseCall() throws SuspendExecution, InterruptedException {
        if (pulseCalls.isEmpty()){
            return;
        }
        Log.console("pulseCall 即将处理[{}]个call", pulseCalls.size());
        for (Call call : pulseCalls) {
            dispatchCall(call);
        }
        Log.console("pulseCall 处理完成[{}]个call", pulseCalls.size());

        pulseCalls.clear();
    }

    private void pulseCallResults() throws SuspendExecution, InterruptedException {
        if (pulseResultCalls.isEmpty()){
            return;
        }
        Log.console("pulseResultCalls 即将处理[{}]个call", pulseResultCalls.size());
        for (Call call : pulseResultCalls) {
            dispatchCallResult(call);
        }
        Log.console("pulseResultCalls 处理完成[{}]个call", pulseResultCalls.size());

        pulseResultCalls.clear();
    }


    private void dispatchCall(Call call) throws SuspendExecution, InterruptedException {
        Service service = services.get(call.to.getServId());
        if(service == null){
            System.out.println(name + "下service不存在 " + call.to.getServId());
            return;
        }

        // 申请一个协程
        PortFiber fiber = pool.apply();
        Log.console("处理第 [{}] 个call step1", call.sn);
        contextIdCur = ++contextIdCreater;
        CallContext context = new CallContext(contextIdCur, call, service, fiber);
        contexts.put(contextIdCur, context);

        fiber.send(context);
        // 清理
        service.port.contextIdCur = 0;
        fiber.tryClose();

        Log.console("处理第 [{}] 个call step2", call.sn);
    }

    private void dispatchCallResult(Call call) throws SuspendExecution, InterruptedException {
        CallContext context = contexts.get(call.callbackId);
        if (context == null){
            Log.console("context消失了????");
            return;
        }
        // 当前环境
        contextIdCur = call.callbackId;
        Log.console("处理第 [{}] 个callResult step1", call.sn);
        context.fiber.result(call.result);
        context.fiber.tryClose();
        Log.console("处理第 [{}] 个callResult step2", call.sn);

    }

    /**
     * 防止与生产者线程CAS并发过多，放入本线程的pulseCalls和pulseResultCalls后再慢慢处理
     */
    private void pulseAffirm() {
        // 本次心跳要执行的call
        Call call = null;
        while ((call = calls.poll()) != null) {
            pulseCalls.add(call);
        }
        // 本次心跳要执行的callResult
        while ((call = callResults.poll()) != null) {
            pulseResultCalls.add(call);
        }
    }

    public void addCall(Call call) {
        if (call.type == 1){
            calls.add(call);
        }else{
            callResults.add(call);
        }
    }

    public void addService(Service service) {
        services.put(service.name, service);
    }

    public static Port getCurrent(){
        return local.get();
//        return ThreadLocalMap.get();
//        return local.get();
    }

    public Node getNode(){
        return node;
    }

    public String getName() {
        return name;
    }

    /**
     * 发送call请求
     * @param callPoint
     * @param methodKey
     * @param params
     */
    public void call(CallPoint callPoint, int methodKey, Object[] params){
        Call call = new Call();
        call.fromNode = getNode().getName();
        call.fromPort = this.name;

        call.type = 1;

        call.to = callPoint;

        call.methodKey = methodKey;
        call.methodParam = params;

        // 发送到目标线程
        getNode().addCall(call);
    }

    /**
     * 发送call请求，并阻塞协程等待返回
     * @param callPoint
     * @param methodKey
     * @param params
     * @return
     * @throws SuspendExecution
     * @throws InterruptedException
     */
    public Object callWait(CallPoint callPoint, int methodKey, Object[] params) throws SuspendExecution, InterruptedException {
        CallContext context = contexts.get(this.contextIdCur);


        Call call = new Call();
        call.fromNode = getNode().getName();
        call.fromPort = this.name;

        call.type = 1;
        call.needResult = true;

        call.to = callPoint;
        call.sn = context.call.sn;

        call.methodKey = methodKey;
        call.methodParam = params;

        call.callbackId = this.contextIdCur;

        // 发送到目标线程
        node.dispatch(call);

        Log.console("channel wait 第 [{}] 个call step1", context.call.sn);
        Object result = context.fiber.getWait();
        Log.console("channel wait 第 [{}] 个call step2", context.call.sn);

        return result;

    }
}
